package com.ouyunc.mq.config.kafka.builder.impl;


import com.ouyunc.mq.config.kafka.builder.KafkaMqBuilder;
import com.ouyunc.mq.config.kafka.strategy.KafkaMqStrategy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.List;

/**
 * @Author fangzhenxun
 * @Description KafkaTemplate 建造者具体建造类
 * org.springframework.boot.autoconfigure.kafka.KafkaProperties kafka 的参数属行的配置
 * @Date 2020/3/13 11:05
 **/
@Slf4j
@Configuration
public class KafkaTemplateBuilder implements KafkaMqBuilder<KafkaTemplate> {


    /**
     * 获取当前选中的mq使用模式类型，如果没有设置primary则默认为单例模式类型
     **/
    @Value("${mq.kafka.primary:STANDALONE}")
    private String type;


    /**
     * 获取所有rabbitmq的模式策略
     **/
    @Autowired
    private List<KafkaMqStrategy> kafkaMqStrategyList;


    /**
     * @Author fangzhenxun
     * @Description
     * @Date 2020/3/13 11:06
     * @param
     * @return org.springframework.kafka.core.KafkaTemplate
     **/
    @Override
    public KafkaTemplate build() {
        KafkaTemplate kafkaTemplate = null;
        if (null == kafkaTemplate) {
            //获取当前选中的配置策略
            KafkaMqStrategy kafkaMqStrategy = currentKafkaMqStrategy();
            //构建生产者工厂
            ProducerFactory producerFactory = kafkaMqStrategy.buildProducerFactory();
            //创建kafka操作模版
            kafkaTemplate = new KafkaTemplate(producerFactory);
        }
        return kafkaTemplate;
    }




    /**
     * @Author fangzhenxun
     * @Description  获取当前选中的rabbitmq
     * @date 2020/2/29 17:53
     * @param
     * @return com.xyt.mq.config.rabbitmq.stratety.RabbitMqStrategy
     */
    private KafkaMqStrategy currentKafkaMqStrategy() {
        if (kafkaMqStrategyList != null && !kafkaMqStrategyList.isEmpty()) {
            return kafkaMqStrategyList.parallelStream().filter(mqStrategy -> {
                String mqModel = mqStrategy.getType().name();
                if (type.equals(mqModel)) {
                    log.info("当前kafkaTemplate加载模式为========》" + mqStrategy.getType().getMqModel());
                }
                return type.equals(mqModel);
            }).findAny().orElseThrow(() ->new RuntimeException("没有找到对应的配置方式"));
        }
        throw new RuntimeException("没有找到对应的配置方式");
    }
}
